package com.mall.order.consumer;


import com.alibaba.fastjson.JSON;
import com.mall.order.OrderPromoService;
import com.mall.order.constant.OrderRetCode;
import com.mall.order.dto.CreateSeckillOrderRequest;
import com.mall.order.dto.CreateSeckillOrderResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.util.Map;

@Component
@Slf4j
public class PromoOrderConsumer {

    @Value("${mq.nameserver.addr}")
    private String addr;

    private final static String topicName = "promo_order";

    private DefaultMQPushConsumer mqConsumer;

    @Reference
    private OrderPromoService orderPromoService;

    @PostConstruct
    public void init() throws MQClientException {
        mqConsumer = new DefaultMQPushConsumer("promo_order_group");
        mqConsumer.setNamesrvAddr(addr);
        mqConsumer.subscribe(topicName, "*");

        MessageListenerConcurrently listener = (msgs, context) -> {

            byte[] body = msgs.get(0).getBody();
            String bodyStr = new String(body);

            // 下单
            Map<String, Object> map = JSON.parseObject(bodyStr, Map.class);
            String username = (String) map.get("username");
            Integer userId = (Integer) map.get("userId");
            Integer productId = (Integer) map.get("productId");
            BigDecimal price = (BigDecimal) map.get("price");

            Integer addressId = (Integer) map.get("addressId");
            String tel = (String) map.get("tel");
            String streetName = (String) map.get("streetName");


            CreateSeckillOrderRequest createSeckillOrderRequest = new CreateSeckillOrderRequest();
            createSeckillOrderRequest.setUserId(userId.longValue());
            createSeckillOrderRequest.setProductId(productId.longValue());
            createSeckillOrderRequest.setUsername(username);
            createSeckillOrderRequest.setPrice(price);
            createSeckillOrderRequest.setAddressId(Long.valueOf(addressId));
            createSeckillOrderRequest.setTel(tel);
            createSeckillOrderRequest.setStreetName(streetName);
            log.info("秒杀创建订单接口参数request:{}",JSON.toJSONString(createSeckillOrderRequest));
            CreateSeckillOrderResponse promoOrder = orderPromoService.createPromoOrder(createSeckillOrderRequest);
            if (promoOrder.getCode().equals(OrderRetCode.SUCCESS.getCode())) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }else {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }


        };
        mqConsumer.registerMessageListener(listener);
        mqConsumer.start();
    }
}
